forked from sky-big/RabbitMQ
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrabbit_amqqueue_process.erl
1825 lines (1525 loc) · 78.1 KB
/
rabbit_amqqueue_process.erl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
%% The contents of this file are subject to the Mozilla Public License
%% Version 1.1 (the "License"); you may not use this file except in
%% compliance with the License. You may obtain a copy of the License
%% at http://www.mozilla.org/MPL/
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and
%% limitations under the License.
%%
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
%%
-module(rabbit_amqqueue_process).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
-behaviour(gen_server2).
-define(SYNC_INTERVAL, 200). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
-export([info_keys/0]).
-export([init_with_backing_queue_state/7]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
prioritise_cast/3, prioritise_info/3, format_message_queue/2]).
%% Queue's state
-record(q, {
q, %% 队列信息数据结构amqqueue
exclusive_consumer, %% 当前队列的独有消费者
has_had_consumers, %% 当前队列中是否有消费者的标识
backing_queue, %% backing_queue对应的模块名字
backing_queue_state, %% backing_queue对应的状态结构
consumers, %% 消费者存储的优先级队列
expires, %% 当前队列未使用就删除自己的时间
sync_timer_ref, %% 同步confirm的定时器,当前队列大部分接收一次消息就要确保当前定时器的存在(200ms的定时器)
rate_timer_ref, %% 队列中消息进入和出去的速率定时器
expiry_timer_ref, %% 队列中未使用就删除自己的定时器
stats_timer, %% 向rabbit_event发布信息的数据结构状态字段
msg_id_to_channel, %% 当前队列进程中等待confirm的消息gb_trees结构,里面的结构是Key:MsgId Value:{SenderPid, MsgSeqNo}
ttl, %% 队列中设置的消息存在的时间
ttl_timer_ref, %% 队列中消息存在的定时器
ttl_timer_expiry, %% 当前队列头部消息的过期时间点
senders, %% 向当前队列发送消息的rabbit_channel进程列表
dlx, %% 死亡消息要发送的exchange交换机(通过队列声明的参数或者policy接口来设置)
dlx_routing_key, %% 死亡消息要发送的路由规则(通过队列声明的参数或者policy接口来设置)
max_length, %% 当前队列中消息的最大上限(通过队列声明的参数或者policy接口来设置)
max_bytes, %% 队列中消息内容占的最大空间
args_policy_version, %% 当前队列中参数设置对应的版本号,每设置一次都会将版本号加一
status %% 当前队列的状态
}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(init_with_backing_queue_state/7 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(),
[rabbit_types:delivery()], pmon:pmon(), dict:dict()) -> #q{}).
-endif.
%%----------------------------------------------------------------------------
%% 统计消息队列相关信息的关键key列表
-define(STATISTICS_KEYS,
[
name,
policy,
exclusive_consumer_pid,
exclusive_consumer_tag,
messages_ready,
messages_unacknowledged,
messages,
consumers,
consumer_utilisation,
memory,
slave_pids,
synchronised_slave_pids,
recoverable_slaves,
state
]).
%% 消息队列创建的时候向rabbit_event事件中心发布的信息关键key列表
-define(CREATION_EVENT_KEYS,
[
name,
durable,
auto_delete,
arguments,
owner_pid
]).
%% 消息队列所有信息的关键key列表
-define(INFO_KEYS, [pid | ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [name]]).
%%----------------------------------------------------------------------------
%% 列出队列宏定义里关键key对应的信息,包括backing_queue的关键key
info_keys() -> ?INFO_KEYS ++ rabbit_backing_queue:info_keys().
%% 列出消息队列统计信息的关键key列表,包括backing_queue的关键key
statistics_keys() -> ?STATISTICS_KEYS ++ rabbit_backing_queue:info_keys().
%%----------------------------------------------------------------------------
%% 队列进程的初始化函数,该进程启动在rabbit_amqqueue_sup监督进程下
init(Q) ->
process_flag(trap_exit, true),
%% 通过队列的名字保存当前队列进程的进程名字
?store_proc_name(Q#amqqueue.name),
%% 每次队列重启或者第一次启动都会在此处更新当前队列的Pid
{ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE},
%% 在此处返回最新的回调模块(此功能是gen_server2行为新增加的功能,在此处改变gen_server2的回调模块)
?MODULE}.
%% 初始化队列的基本信息
init_state(Q) ->
%% 组装队列状态的数据结构
State = #q{q = Q,
exclusive_consumer = none,
has_had_consumers = false,
consumers = rabbit_queue_consumers:new(),
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
args_policy_version = 0},
%% 初始化向rabbit_event事件中心发布事件的数据结构
rabbit_event:init_stats_timer(State, #q.stats_timer).
%% 初始化队列,如果该消息队列没有独有拥有者,则直接后续的初始化
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
init_it2(Recover, From, State);
%% You used to be able to declare an exclusive durable queue. Sadly we
%% need to still tidy up after that case, there could be the remnants
%% of one left over from an upgrade. So that's why we don't enforce
%% Recover = new here.
%% 如果当前队列设置有exclusive_owner拥有者(该拥有者是rabbit_reader进程Pid)
%% 如果拥有者死亡,将当前队列进程立刻停止,missing_owner的DOWN原因会将当前队列的信息全部删除,包括持久化队列信息
init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
case rabbit_misc:is_process_alive(Owner) of
%% 如果拥有者当前是存活的,则队列继续下面的初始化
true -> erlang:monitor(process, Owner),
init_it2(Recover, From, State);
%% 拥有者已经死亡
false -> #q{backing_queue = undefined,
backing_queue_state = undefined,
q = Q} = State,
%% 通知让自己初始化的进程,拥有者已经死亡
send_reply(From, {owner_died, Q}),
BQ = backing_queue_module(Q),
%% 根据恢复信息,将当前backing_queue的数据恢复出来,当队列处理missing_owner的时候,会将backing_queue的数据清除掉
{_, Terms} = recovery_status(Recover),
BQS = bq_init(BQ, Q, Terms),
%% Rely on terminate to delete the queue.
%% 将当前队列进程立刻停止,missing_owner的DOWN原因会将当前队列的信息全部删除,包括持久化队列信息
{stop, {shutdown, missing_owner},
State#q{backing_queue = BQ, backing_queue_state = BQS}}
end.
%% 消息队列进程启动后,通知队列进程进行相关的初始化(如果该队列是持久化队列,且上次RabbitMQ系统崩溃,则在此处可以恢复)
init_it2(Recover, From, State = #q{q = Q,
backing_queue = undefined,
backing_queue_state = undefined}) ->
{Barrier, TermsOrNew} = recovery_status(Recover),
%% 将新增加的队列存入rabbit_queue数据库表(如果当前队列不是重新创建,则只用更新队列的状态为live,如果是新的队列,则将队列的数据写入到数据库中)
case rabbit_amqqueue:internal_declare(Q, Recover /= new) of
#amqqueue{} = Q1 ->
%% Q和Q1中的字段进行比较
case matches(Recover, Q, Q1) of
true ->
%% 向file_handle_cache进行注册
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
%% rabbit_memory_monitor进程通知消息队列最新的内存持续时间
ok = rabbit_memory_monitor:register(
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
%% 拿到rabbit应用中backing_queue_module对应的配置参数(现在是rabbit_priority_queue),如果队列是镜像队列,则为rabbit_mirror_queue_master
BQ = backing_queue_module(Q1),
%% 根据得到的模块名(rabbit_priority_queue)进行相关的初始化
BQS = bq_init(BQ, Q, TermsOrNew),
%% 通知启动该队列进程{new, Q}消息结构
send_reply(From, {new, Q}),
%% 恢复屏障,阻塞等待rabbit进程发送go的消息(如果是恢复持久化队列,则该队列进程阻塞等待启动进程发过来的go消息)
recovery_barrier(Barrier),
%% 通过队列声明时候的参数或者通过rabbitmqctl设置过来的参数初始化队列状态
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
%% 通知队列修饰模块startup启动事件
notify_decorators(startup, State),
%% 发布队列创建的事件
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
%% 如果配置文件配置,则将当前队列的信息发布到rabbit_event中
rabbit_event:if_enabled(State1, #q.stats_timer,
fun() -> emit_stats(State1) end),
noreply(State1);
false ->
{stop, normal, {existing, Q1}, State}
end;
Err ->
{stop, normal, Err, State}
end.
%% barrier:屏障
recovery_status(new) -> {no_barrier, new};
recovery_status({Recover, Terms}) -> {Recover, Terms}.
%% 向From返回Q
send_reply(none, _Q) -> ok;
send_reply(From, Q) -> gen_server2:reply(From, Q).
%% 判断Q1和Q2队列是否匹配
matches(new, Q1, Q2) ->
%% i.e. not policy
Q1#amqqueue.name =:= Q2#amqqueue.name andalso
Q1#amqqueue.durable =:= Q2#amqqueue.durable andalso
Q1#amqqueue.auto_delete =:= Q2#amqqueue.auto_delete andalso
Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids;
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
%% 恢复屏障,阻塞等待rabbit进程发送go的消息
recovery_barrier(no_barrier) ->
ok;
recovery_barrier(BarrierPid) ->
MRef = erlang:monitor(process, BarrierPid),
receive
{BarrierPid, go} -> erlang:demonitor(MRef, [flush]);
{'DOWN', MRef, process, _, _} -> ok
end.
%% 副镜像队列成为主镜像队列后会调用该接口进行backing_queue的状态的初始化
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
RateTRef, Deliveries, Senders, MTC) ->
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
end,
%% 初始化队列的基本信息
State = init_state(Q),
%% 初始化backing_queue模块名字和状态数据结构,速率定时器,rabbit_channel进程列表,以及消息confirm相关的数据结构
State1 = State#q{backing_queue = BQ,
backing_queue_state = BQS,
rate_timer_ref = RateTRef,
senders = Senders,
msg_id_to_channel = MTC},
%% 通过队列声明时候的参数或者通过rabbitmqctl设置过来的参数初始化队列状态
State2 = process_args_policy(State1),
State3 = lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
%% 通知队列修饰模块当前主镜像队列已经启动
notify_decorators(startup, State3),
State3.
%% 队列中断的接口(shutdown的原因,则消息队列直接停止,不会将自己的信息全部删除)
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
%% 队列进程关闭,原因是丢失拥有者,拥有者丢失后,立刻将队列删除掉,会将该队列直接从RabbitMQ系统中删除,包括持久化队列
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
%% 队列进程关闭,消息队列直接停止,不会将自己的信息全部删除
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
%% 消息队列正常终止,会将该队列直接从RabbitMQ系统中删除,包括持久化队列
terminate(normal, State) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, State), State);
%% If we crashed don't try to clean up the BQS, probably best to leave it.
%% 其它原因致使队列进程中断
terminate(_Reason, State = #q{q = Q}) ->
terminate_shutdown(fun (BQS) ->
%% 现将队列的状态置为crashed
Q2 = Q#amqqueue{state = crashed},
%% 将amqqueue的数据结构写入mnesia数据库(如果需要持久化的则写入rabbit_durable_queue数据表)
rabbit_misc:execute_mnesia_transaction(
fun() ->
rabbit_amqqueue:store_queue(Q2)
end),
BQS
end, State).
%% 将该队列的backing_queue的数据删除掉,同时将当前队列从Mnesia数据库中删除掉
terminate_delete(EmitStats, Reason,
State = #q{q = #amqqueue{name = QName},
backing_queue = BQ}) ->
fun (BQS) ->
%% 先让backing_queue执行delete_and_terminate操作
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% 将当前队列的状态发布到rabbit_event事件中心中
if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer,
fun() -> emit_stats(State) end);
true -> ok
end,
%% 然后执行内部的队列删除操作
%% don't care if the internal delete doesn't return 'ok'.
rabbit_amqqueue:internal_delete(QName),
BQS1
end.
%% 先将当前队列中所有的定时器立刻关闭
%% 队列进程从rabbit_memory_monitor进程中取消注册,将所有的消费者信息发布到rabbit_event事件中心,然后执行Fun函数
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS, consumers = Consumers} =
lists:foldl(fun (F, S) -> F(S) end, State,
[%% 停止同步confirm的定时器
fun stop_sync_timer/1,
%% 停止速率监控定时器
fun stop_rate_timer/1,
%% 将队列expires未使用就删除的定时器停止掉
fun stop_expiry_timer/1,
%% 停止消息存在时间过期删除消息的定时器
fun stop_ttl_timer/1]),
case BQS of
undefined -> State1;
_ -> %% 取消在rabbit_memory_monitor内存速率监控进程的注册
ok = rabbit_memory_monitor:deregister(self()),
%% 取得当前队列的队列名字
QName = qname(State),
%% 通知队列的修饰模块,队列进程终止
notify_decorators(shutdown, State),
%% 将当前队列的所有消费者被删除的信息发布到rabbit_event
[emit_consumer_deleted(Ch, CTag, QName) ||
{Ch, CTag, _, _, _} <-
rabbit_queue_consumers:all(Consumers)],
%% 对backing_queue_state的状态执行Fun函数
State1#q{backing_queue_state = Fun(BQS)}
end.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%----------------------------------------------------------------------------
maybe_notify_decorators(false, State) -> State;
maybe_notify_decorators(true, State) -> notify_decorators(State), State.
%% 通知队列修饰模块Event事件
notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []).
notify_decorators(State = #q{consumers = Consumers,
backing_queue = BQ,
backing_queue_state = BQS}) ->
P = rabbit_queue_consumers:max_active_priority(Consumers),
decorator_callback(qname(State), consumer_state_changed,
[P, BQ:is_empty(BQS)]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
case rabbit_amqqueue:lookup(QName) of
{ok, Q = #amqqueue{decorators = Ds}} ->
[ok = apply(M, F, [Q | A]) || M <- rabbit_queue_decorator:select(Ds)];
{error, not_found} ->
ok
end.
%% 根据得到的backing_queue模块名BQ进行相关的初始化
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,
fun (Mod, Fun) ->
rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
end).
%% 通过队列声明时候的参数或者通过rabbitmqctl设置过来的参数初始化队列状态
process_args_policy(State = #q{q = Q,
args_policy_version = N}) ->
%% 第二个参数是解决队列参数和队列policy设置的值的冲突解决函数
ArgsTable =
[
%% 控制queue被自动删除前可以处于未使用状态的时间,未使用的意思是queue上没有任何consumer,queue没有被重新声明,并且在过期时间段内未调用过basic.get命令
{<<"expires">>, fun res_min/2, fun init_exp/2},
%% 将死的信息重新发布到该exchange交换机上(死信息包括:1.消息被拒绝(basic.reject or basic.nack);2.消息TTL过期;3.队列达到最大长度)
{<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
%% 将死的新重新发布的时候的路由规则
{<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
%% 控制被publish到queue中的 message 被丢弃前能够存活的时间
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
%% 当前队列最大消息数量参数
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
%% 当前队列中消息的内容最大上限
{<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}
],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
Fun(args_policy_lookup(Name, Resolve, Q), StateN)
end, State#q{args_policy_version = N + 1}, ArgsTable)).
%% 解决rabbit_policy定制的当前队列的策略和消息队列自己带的策略的冲突的解决函数,如果两边都有Name的配置,则调用Resolve函数进行解决
args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
%% 队列声明初始化的时候会在参数前面加上"x-"前缀
AName = <<"x-", Name/binary>>,
case {rabbit_policy:get(Name, Q), rabbit_misc:table_lookup(Args, AName)} of
{undefined, undefined} -> undefined;
{undefined, {_Type, Val}} -> Val;
{Val, undefined} -> Val;
{PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal)
end.
%% 以队列声明的时候参数为准
res_arg(_PolVal, ArgVal) -> ArgVal.
%% 获取最小值
res_min(PolVal, ArgVal) -> erlang:min(PolVal, ArgVal).
%% In both these we init with the undefined variant first to stop any
%% existing timer, then start a new one which may fire after a
%% different time.
%% 如果expires处于未使用删除队列时间没有设置,则将expires定时器停止掉
init_exp(undefined, State) -> stop_expiry_timer(State#q{expires = undefined});
init_exp(Expires, State) -> State1 = init_exp(undefined, State),
%% 启动队列未使用就删除的定时器
ensure_expiry_timer(State1#q{expires = Expires}).
%% 初始化队列中消息存在的时间
init_ttl(undefined, State) -> stop_ttl_timer(State#q{ttl = undefined});
init_ttl(TTL, State) -> (init_ttl(undefined, State))#q{ttl = TTL}.
%% 初始化队列死亡消息要发送的exchange交换机
init_dlx(undefined, State) ->
State#q{dlx = undefined};
init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
%% 初始化队列死亡消息要发送的路由规则
init_dlx_rkey(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}.
%% 根据参数初始化当前队列中消息的最大上限
init_max_length(MaxLen, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}),
State1.
%% 初始化队列中消息内容占的最大空间
init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.
%% 队列进程处理消息后,返回消息给发送进程,同时如果需要做同步操作则进行同步操作
reply(Reply, NewState) ->
{NewState1, Timeout} = next_state(NewState),
{reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
%% 队列进程处理消息后,没有必要返回消息给发送进程,同时如果需要做同步操作则进行同步操作
noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
next_state(State = #q{backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
%% 进行断言判断,如果有消费者同时消息队列中有消息,则认为出错了,有消费者同时又有消息,则是不应该出现的情况
assert_invariant(State),
%% 从backing_queue得到已经confirm的消息ID列表
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
%% 将得到的已经confirm消息ID发送到rabbit_channel进程进程confirm操作
MTC1 = confirm_messages(MsgIds, MTC),
State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1},
%% 通过backing_queue模块判断是否需要同步confirm操作
case BQ:needs_timeout(BQS1) of
%% 如果消息索引模块中有没有尚未confirm的消息,则停止同步的定时器
false -> {stop_sync_timer(State1), hibernate };
%% 如果消息索引模块中有没有尚未confirm的消息,则停止同步的定时器,但是定一个200ms的超时,进行后续的同步操作
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
%% 消息索引模块中有尚未confirm的消息,则开启同步的定时器
timed -> {ensure_sync_timer(State1), 0 }
end.
%% 拿到rabbit应用中backing_queue_module对应的配置参数(现在是rabbit_priority_queue),如果队列是镜像队列,则为rabbit_mirror_queue_master
backing_queue_module(Q) ->
case rabbit_mirror_queue_misc:is_mirrored(Q) of
%% 该队列没有设置镜像队列,则拿到backing_queue的模块为rabbit_priority_queue
false -> {ok, BQM} = application:get_env(backing_queue_module),
BQM;
%% 当期队列是镜像队列,且该队列为主镜像队列
true -> rabbit_mirror_queue_master
end.
%% 启动同步confirm的定时器
ensure_sync_timer(State) ->
rabbit_misc:ensure_timer(State, #q.sync_timer_ref,
?SYNC_INTERVAL, sync_timeout).
%% 停止同步confirm的定时器
stop_sync_timer(State) -> rabbit_misc:stop_timer(State, #q.sync_timer_ref).
%% 启动速率监控的定时器
ensure_rate_timer(State) ->
rabbit_misc:ensure_timer(State, #q.rate_timer_ref,
?RAM_DURATION_UPDATE_INTERVAL,
update_ram_duration).
%% 停止速率监控定时器
stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #q.rate_timer_ref).
%% We wish to expire only when there are no consumers *and* the expiry
%% hasn't been refreshed (by queue.declare or basic.get) for the
%% configured period.
%% 将队列expires未使用就删除的定时器开启
ensure_expiry_timer(State = #q{expires = undefined}) ->
State;
ensure_expiry_timer(State = #q{expires = Expires,
args_policy_version = Version}) ->
case is_unused(State) of
%% 如果没有消费者,则将当前未使用定时器停止掉,重新启动定时器,如果在这个时间段没有basic_get操作,则将队列删除掉
true -> NewState = stop_expiry_timer(State),
rabbit_misc:ensure_timer(NewState, #q.expiry_timer_ref,
Expires, {maybe_expire, Version});
false -> State
end.
%% 将队列expires未使用就删除的定时器停止掉
stop_expiry_timer(State) -> rabbit_misc:stop_timer(State, #q.expiry_timer_ref).
%% 启动消息过期检查的定时器(将队列头部的第一个消息的过期时间拿出来设置过期时间定时器)
ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
args_policy_version = Version}) ->
%% 消息中存储的过期时间是微秒单位
After = (case Expiry - now_micros() of
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
%% 向自己当前的队列进程发送消息过期消息
TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}),
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
%% 新传入的消息只有小于当前消息的过期时间,才取消定时器,重新设置定时器
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
%% 如果当前的消息过期时间TExpiry大于Expiry加上一毫秒
when Expiry + 1000 < TExpiry ->
%% 则取消当前的定时器,用Expiry这个小的时间开启定时器
rabbit_misc:cancel_timer(TRef),
%% 使用新的时间设置消息过期定时器
ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined});
ensure_ttl_timer(_Expiry, State) ->
State.
%% 停止消息存在时间过期删除消息的定时器
stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref).
%% 确认状态定时器的启动
ensure_stats_timer(State) ->
rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats).
%% 进行断言判断,如果有消费者同时消息队列中有消息,则认为出错了,有消费者同时又有消息,则是不应该出现的情况
assert_invariant(State = #q{consumers = Consumers}) ->
true = (rabbit_queue_consumers:inactive(Consumers) orelse is_empty(State)).
%% 判断当前消息队列是否为空
is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
%% 队列中的消息为空后,将当前队列中credit中的mode为drain的消费者取出来,通知这些消费者队列中的消息为空
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
%% 当前情况是原始状态的队列是非空的,但是做了相关的操作后成为空队列
true -> notify_decorators(State),
rabbit_queue_consumers:send_drained();
false -> ok
end,
State.
%% 进行消息的confirm
confirm_messages([], MTC) ->
MTC;
confirm_messages(MsgIds, MTC) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
%% 从未confirm的gb_trees结构中查找该消息对应的信息
case gb_trees:lookup(MsgId, MTC0) of
{value, {SenderPid, MsgSeqNo}} ->
%% 将rabbit_channel进程的Pid对应的键值对更新插入MsgSeqNo
{rabbit_misc:gb_trees_cons(SenderPid,
MsgSeqNo, CMs),
%% 将该消息ID对应的key-value键值对从gb_trees结构中删除掉
gb_trees:delete(MsgId, MTC0)};
none ->
{CMs, MTC0}
end
end, {gb_trees:empty(), MTC}, MsgIds),
%% 将得到已经confirm的消息confirm号发送到rabbit_channel进程
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
MTC1.
%% 消息队列记录要confirm的消息,如果confirm为false,则不记录要confirm
send_or_record_confirm(#delivery{confirm = false}, State) ->
{never, State};
%% 如果confirm为true,同时durable为true,则将消息记录要msg_id_to_channel字段
send_or_record_confirm(#delivery{confirm = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
State = #q{q = #amqqueue{durable = true},
msg_id_to_channel = MTC}) ->
%% 将需要confirm的消息插入msg_id_to_channel平衡二叉树字段中
MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC),
%% 表示消息是持久化消息且消息队列也是持久化队列,需要等到消息存储到磁盘才能够进行confirm操作
{eventually, State#q{msg_id_to_channel = MTC1}};
%% 如果消息需要confirm,但是队列不是持久化队列,则通知rabbit_channel进程已经收到该消息,即向rabbit_channel进程进行confirm
send_or_record_confirm(#delivery{confirm = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo}, State) ->
%% 向rabbit_channel进程发送confirm消息
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
%% 消息是非持久化消息,但是需要进行confirm,则当前已经到消息队列,可以立刻通知rabbit_channel进程进行confirm操作
{immediately, State}.
%% 如果当前消息mandatory字段为true,则立刻通知该消息对应的rabbit_channel进程
send_mandatory(#delivery{mandatory = false}) ->
ok;
send_mandatory(#delivery{mandatory = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo}) ->
gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
%% 丢弃消息
discard(#delivery{confirm = Confirm,
sender = SenderPid,
flow = Flow,
message = #basic_message{id = MsgId}}, BQ, BQS, MTC) ->
%% 丢弃的消息如果需要进行confirm操作,则立刻通知rabbit_channel进程
MTC1 = case Confirm of
%% 进行消息的confirm
true -> confirm_messages([MsgId], MTC);
false -> MTC
end,
%% backing_queue消息丢弃则不做任何操作
BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
{BQS1, MTC1}.
%% 当前队列有没有被锁住的消费者,同时队列中还有消息,则将消息一条条的下发给没有锁住的消费者
run_message_queue(State) -> run_message_queue(false, State).
run_message_queue(ActiveConsumersChanged, State) ->
case is_empty(State) of
%% 如果消息队列为空,则不继续向消费者发送消息
true -> maybe_notify_decorators(ActiveConsumersChanged, State);
false -> case rabbit_queue_consumers:deliver(
%% 从backing_queue模块中取得一个消息
fun(AckRequired) -> fetch(AckRequired, State) end,
qname(State), State#q.consumers) of
%% 传递成功后,继续将消息发送给消费者,如果成功则不断的往后发送直到没有消息或者没有满足条件的消费者
{delivered, ActiveConsumersChanged1, State1, Consumers} ->
run_message_queue(
ActiveConsumersChanged or ActiveConsumersChanged1,
State1#q{consumers = Consumers});
%% 传递失败后,则不再继续将消息发送给消费者
{undelivered, ActiveConsumersChanged1, Consumers} ->
maybe_notify_decorators(
ActiveConsumersChanged or ActiveConsumersChanged1,
State#q{consumers = Consumers})
end
end.
%% 尝试将消息发送给当前队列中的消费者
attempt_delivery(Delivery = #delivery{sender = SenderPid,
flow = Flow,
message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
%% 检查当前队列中的消费者,如果有能够发送的消费者,则直接发送给消费者,如果没有满足条件的消费者,则将消息存入到队列中
case rabbit_queue_consumers:deliver(
%% 需要确认的消息需要发送到backing_queue模块中记录下来
fun (true) -> %% 如果当前能够将消息直接发送给消费者,则当前队列必须是为空
true = BQ:is_empty(BQS),
{AckTag, BQS1} =
%% 将消息发送到backing_queue中(将已经发给消费者等待ack的消息通知backing_queue)
BQ:publish_delivered(
Message, Props, SenderPid, Flow, BQS),
{{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> %% 将消息发送给消费者后,如果消息不要确认,则直接将消息丢弃掉
{{Message, Delivered, undefined},
discard(Delivery, BQ, BQS, MTC)}
end, qname(State), State#q.consumers) of
%% 已经将消息发送给消费者
{delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} ->
{delivered, maybe_notify_decorators(
ActiveConsumersChanged,
State#q{backing_queue_state = BQS1,
msg_id_to_channel = MTC1,
consumers = Consumers})};
%% 没有能够接收消息的消费者
{undelivered, ActiveConsumersChanged, Consumers} ->
{undelivered, maybe_notify_decorators(
ActiveConsumersChanged,
State#q{consumers = Consumers})}
end.
%% 传递给消费者或者没有消费者则将消息存储在队列中
deliver_or_enqueue(Delivery = #delivery{message = Message,
sender = SenderPid,
flow = Flow},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% 如果当前消息mandatory字段为true,则立刻通知该消息对应的rabbit_channel进程
send_mandatory(Delivery), %% must do this before confirms
%% 消息队列记录要confirm的消息,如果confirm为false,则不记录要confirm(如果消息需要进行confirm,则将该消息的信息存入msg_id_to_channel字段中)
{Confirm, State1} = send_or_record_confirm(Delivery, State),
%% 得到消息特性特性数据结构
Props = message_properties(Message, Confirm, State1),
%% 让backing_queue去判断当前消息是否重复(rabbit_variable_queue没有实现,直接返回的false)
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State2 = State1#q{backing_queue_state = BQS1},
case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
State2) of
true ->
State2;
%% 已经将消息发送给消费者的情况
{delivered, State3} ->
State3;
%% The next one is an optimisation(优化)
%% 没有消费者来取消息的情况(discard:抛弃)
%% 当前消息没有发送到对应的消费者,同时当前队列中设置的消息过期时间为0,同时重新发送的exchange交换机为undefined,则立刻将该消息丢弃掉
{undelivered, State3 = #q{ttl = 0, dlx = undefined,
backing_queue_state = BQS2,
msg_id_to_channel = MTC}} ->
%% 直接将消息丢弃掉,如果需要confirm的消息则立刻通知rabbit_channel进程进行confirm操作
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
%% 没有消费者来取消息的情况
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
%% 将消息发布到backing_queue中
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
%% 判断当前队列中的消息数量超过上限或者消息的占的空间大小超过上限
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
%% 得到当前队列中的消息数量
QLen = BQ:len(BQS4),
%% optimisation(优化): it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
%% expiry ends up at the head of the queue. If the head
%% remains unchanged, or if the newly published message
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped, QLen =:= 1, Props#message_properties.expiry} of
%% 该情况是头部没有变化,同时消息队列消息树立不为一,则不管当前加入的消息是否设置有超时时间,都不执行drop_expired_msgs函数
{false, false, _} -> State4;
%% 有丢弃消息,同时当前队列中只有当前这个新的消息,同时消息自己的特性过期时间没有定义,则不检查消息过期
%% 此时消息的头部有变化,但是消息队列中只有一个消息,该消息还没有设置超时时间,则不执行drop_expired_msgs函数
{true, true, undefined} -> State4;
%% 当向队列中插入消息后需要做检查消息过期,同时设置定时器的操作只有三种情况
%% 1.当消息头部根据队列上限有变化,同时消息插入后当前队列消息数量为一,且该消息设置有过期时间,则需要做一次操作(该情况是消息头部有删除消息,都会进行一次消息过期检查)
%% 2.当消息头部根据队列上限有变化,同时消息插入后当前队列消息数量不为一,且该消息设置有过期时间,则需要做一次操作(该情况是消息头部有删除消息,都会进行一次消息过期检查)
%% 3.当消息头部根据队列上限没有变化,同时消息插入后当前队列消息数量为一,不管消息有没有过期时间,都要做一次操作(该情况下是当前队列进入第一条消息)
%% 最重要的是只要消息队列的头部消息有变化,则立刻执行drop_expired_msgs函数,将队列头部超时的消息删除掉
{_, _, _} -> drop_expired_msgs(State4)
end
end.
%% 判断当前队列中的消息数量超过上限或者消息的占的空间大小超过上限,如果有查过上限,则将删除第一个元素,然后继续判断是否超过上限,只当当前队列中的消息没有超过上限
maybe_drop_head(State = #q{max_length = undefined,
max_bytes = undefined}) ->
{false, State};
maybe_drop_head(State) ->
maybe_drop_head(false, State).
maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% 判断当前队列中的消息数量超过上限或者消息的占的空间大小超过上限
case over_max_length(State) of
true ->
maybe_drop_head(true,
with_dlx(
State#q.dlx,
%% 如果当前队列配置有重新发送的exchange交换机,则从当前队列中取出一个消息将该消息发布到配置的重新发送的exchange交换机上
fun (X) -> dead_letter_maxlen_msg(X, State) end,
%% 如果当前队列没有配置有重新发送的exchange交换机,则将当前队列中的头部消息丢弃掉
fun () ->
{_, BQS1} = BQ:drop(false, BQS),
State#q{backing_queue_state = BQS1}
end));
false ->
{AlreadyDropped, State}
end.
%% 判断当前队列中的消息数量超过上限或者消息的占的空间大小超过上限
over_max_length(#q{max_length = MaxLen,
max_bytes = MaxBytes,
backing_queue = BQ,
backing_queue_state = BQS}) ->
BQ:len(BQS) > MaxLen orelse BQ:info(message_bytes_ready, BQS) > MaxBytes.
%% 将AckTags重新放入到队列中,然后对队列进行重新排序,调用该接口的情况:
%% 1.requeue操作
%% 2.rabbit_channel进程挂掉,然后将还没有ack的消息重新放入到队列中
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% 判断当前队列是否为空
WasEmpty = BQ:is_empty(BQS),
%% 让backing_queue将AckTags重新放入到队列后,重新排序
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
%% 判断当前队列中的消息数量超过上限或者消息的占的空间大小超过上限,如果有查过上限,则将删除第一个元素,然后继续判断是否超过上限,只当当前队列中的消息没有超过上限
{_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}),
%% 先将队列中过期的消息删除到没有过期的消息,然后队列中的消息为空后,将当前队列中credit中的mode为drain的消费者取出来,通知这些消费者队列中的消息为空
run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))).
%% 在队列进程中,自己读取消息的接口,执行该函数的情况:
%% 1.客户端单独发送消息来从消息队列中来获取一个消息
%% 2.消息队列自动通过该接口将消息发送到对应的消费者
%% 3.当消息队列中的消息数量或者消息的大小超过了设置的上限,同时设置了死亡消息转发的exchange,则调用该接口,将消息取出来转发,如果没有设置转发exchange,则将消息丢弃掉
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
%% 从backing_queue模块中读取一个消息
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
%% 删除队列中过期的消息(该删除从队列头部开始,如果当下一个消息没有过期,则停止删除过期消息)
State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
%% 队列中的消息为空后,将当前队列中credit中的mode为drain的消费者取出来,通知这些消费者队列中的消息为空
{Result, maybe_send_drained(Result =:= empty, State1)}.
%% 实际处理消费者ack操作的函数,调用该接口德尔情况:
%% 1.客户端发送ack消息来进行确认
%% 2.客户端拒绝消息,但是不将消息重新放入队列
%% 3.超过上限的消息在丢弃的时候,需要进行ack操作
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
State1#q{backing_queue_state = BQS1}
end).
%% AckTags这些消息重新放入到队列中,然后对队列进行重新排序(该操作是客户端发送消息拒绝接受需要ack的消息,则将这些还没有ack的消息重新放入到队列中)
%% 1.客户端直接通知将消息重新放入到队列中
%% 2.客户端拒绝消息,但是需要将消息重新放入到队列中
requeue(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
%% 执行完Update函数后,有可能有锁住状态的消费者被激活,如果有被激活的消费者,则将队列中的消息按顺序发送给消费者
possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
%% 如果没有消费者被解锁,则什么都不做
unchanged -> State;
%% 如果有消费者被解锁,将消息一条条的下发给没有锁住的消费者
{unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
run_message_queue(true, State1)
end.
%% auto_delete为false,则不自动删除队列
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
%% 如果有消费者,也不自动删除队列
should_auto_delete(#q{has_had_consumers = false}) -> false;
%% 如果auto_delete为true,同时没有消费者了,则将当前队列删除
should_auto_delete(State) -> is_unused(State).
%% 处理rabbit_channel进程down掉的消息
handle_ch_down(DownPid, State = #q{consumers = Consumers,
exclusive_consumer = Holder,
senders = Senders}) ->
%% 取消对rabbit_channel进程的监控
State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
false -> Senders;
true -> %% 先清除掉对应rabbit_channel进程流量控制信息
credit_flow:peer_down(DownPid),
%% 然后解除对rabbit_channel进程的监控
pmon:demonitor(DownPid, Senders)
end},
%% 删除ChPid对应的cr数据结构
case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of
not_found ->
{ok, State1};
{ChAckTags, ChCTags, Consumers1} ->
QName = qname(State1),
%% 将消费者被删除的信息发布到rabbit_event
[emit_consumer_deleted(DownPid, CTag, QName) || CTag <- ChCTags],
Holder1 = case Holder of
{DownPid, _} -> none;
Other -> Other
end,
State2 = State1#q{consumers = Consumers1,
exclusive_consumer = Holder1},
%% 回调消息队列的修饰模块
notify_decorators(State2),
%% 判断当前队列是否需要自动删除(当前自动删除的条件是auto_delete字段为true,同时当前队列没有消费者的存在)
case should_auto_delete(State2) of
true -> {stop, State2};
%% rabbit_channel进程down掉后,将还没有ack的消息重新放入到队列中,等待后续消费者的消费
false -> {ok, requeue_and_run(ChAckTags,
%% rabbit_channel进程终止,则重新启动队列未操作删除队列自己的定时器
ensure_expiry_timer(State2))}
end
end.
%% 检查当前队列的独有消费者的正确性
check_exclusive_access({_ChPid, _ConsumerTag}, _ExclusiveConsume, _State) ->
in_use;
check_exclusive_access(none, false, _State) ->
ok;
check_exclusive_access(none, true, State) ->
case is_unused(State) of
true -> ok;
false -> in_use
end.