Skip to content

Commit

Permalink
Avoid frequent #resource{} record creation
Browse files Browse the repository at this point in the history
Avoid #resource{} record creation in every queue type
interaction by storing the queue #resource{} in the
session state.
  • Loading branch information
ansd committed Jun 5, 2024
1 parent d70e529 commit cf3c8ba
Showing 1 changed file with 12 additions and 22 deletions.
34 changes: 12 additions & 22 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@
-record(outgoing_link, {
%% Although the source address of a link might be an exchange name and binding key
%% or a topic filter, an outgoing link will always consume from a queue.
queue_name_bin :: rabbit_misc:resource_name(),
queue_name :: rabbit_amqqueue:name(),
queue_type :: rabbit_queue_type:queue_type(),
send_settled :: boolean(),
max_message_size :: unlimited | pos_integer(),
Expand Down Expand Up @@ -670,7 +670,7 @@ handle_stashed_down(#state{stashed_down = QNames,
%% (This roughly corresponds to consumer_cancel_notify sent from server to client in AMQP 0.9.1.)
{DetachFrames, OutgoingLinks} =
lists:foldl(fun(#resource{name = QNameBinDown}, Acc = {_, OutgoingLinks1}) ->
maps:fold(fun(Handle, Link = #outgoing_link{queue_name_bin = QNameBin}, {Frames0, Links0})
maps:fold(fun(Handle, Link = #outgoing_link{queue_name = #resource{name = QNameBin}}, {Frames0, Links0})
when QNameBin =:= QNameBinDown ->
Detach = detach(Handle, Link, ?V_1_0_AMQP_ERROR_ILLEGAL_STATE),
Frames = [Detach | Frames0],
Expand Down Expand Up @@ -781,7 +781,7 @@ destroy_incoming_link(Handle, Link = #incoming_link{queue_name_bin = QNameBin},
destroy_incoming_link(_, _, _, Acc) ->
Acc.

destroy_outgoing_link(Handle, Link = #outgoing_link{queue_name_bin = QNameBin}, QNameBin, {Frames, Unsettled0, Links}) ->
destroy_outgoing_link(Handle, Link = #outgoing_link{queue_name = #resource{name = QNameBin}}, QNameBin, {Frames, Unsettled0, Links}) ->
{Unsettled, _RemovedMsgIds} = remove_link_from_outgoing_unsettled_map(Handle, Unsettled0),
{[detach(Handle, Link, ?V_1_0_AMQP_ERROR_RESOURCE_DELETED) | Frames],
Unsettled,
Expand Down Expand Up @@ -1093,7 +1093,7 @@ handle_control(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
max_message_size = MaybeMaxMessageSize},
MaxMessageSize = max_message_size(MaybeMaxMessageSize),
Link = #outgoing_link{
queue_name_bin = QNameBin,
queue_name = queue_resource(Vhost, QNameBin),
queue_type = QType,
send_settled = SndSettled,
max_message_size = MaxMessageSize,
Expand Down Expand Up @@ -1207,16 +1207,13 @@ handle_control(Detach = #'v1_0.detach'{handle = ?UINT(HandleInt)},
incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks0,
outgoing_unsettled_map = Unsettled0,
cfg = #cfg{
vhost = Vhost,
user = #user{username = Username}}}) ->
cfg = #cfg{user = #user{username = Username}}}) ->
Ctag = handle_to_ctag(HandleInt),
%% TODO delete queue if closed flag is set to true? see 2.6.6
%% TODO keep the state around depending on the lifetime
{QStates, Unsettled, OutgoingLinks}
= case maps:take(HandleInt, OutgoingLinks0) of
{#outgoing_link{queue_name_bin = QNameBin}, OutgoingLinks1} ->
QName = queue_resource(Vhost, QNameBin),
{#outgoing_link{queue_name = QName}, OutgoingLinks1} ->
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
%%TODO Add a new rabbit_queue_type:remove_consumer API that - from the point of view of
Expand Down Expand Up @@ -1448,10 +1445,9 @@ handle_credit_reply0(
S;
none when QCredit =:= 0 andalso
DesiredCredit > 0 ->
QName = Link0#outgoing_link.queue_name,
%% Provide queue next batch of credits.
CappedCredit = cap_credit(DesiredCredit),
QName = queue_resource(S0#state.cfg#cfg.vhost,
Link0#outgoing_link.queue_name_bin),
{ok, QStates, Actions} =
rabbit_queue_type:credit(
QName, Ctag, DeliveryCount, CappedCredit, false, QStates0),
Expand All @@ -1471,7 +1467,7 @@ handle_credit_reply0(
{credit_reply, Ctag, DeliveryCount, Credit, Available, _Drain = true},
Handle,
Link0 = #outgoing_link{
queue_name_bin = QNameBin,
queue_name = QName,
client_flow_ctl = #client_flow_ctl{
delivery_count = CDeliveryCount0 } = CFC,
queue_flow_ctl = #queue_flow_ctl{
Expand All @@ -1480,7 +1476,6 @@ handle_credit_reply0(
} = QFC,
stashed_credit_req = StashedCreditReq},
S0 = #state{cfg = #cfg{writer_pid = Writer,
vhost = Vhost,
channel_num = ChanNum},
outgoing_links = OutgoingLinks,
queue_states = QStates0}) ->
Expand All @@ -1500,7 +1495,6 @@ handle_credit_reply0(
CappedCredit = cap_credit(DesiredCredit),
Link = Link0#outgoing_link{queue_flow_ctl = QFC#queue_flow_ctl{credit = CappedCredit}},

QName = queue_resource(Vhost, QNameBin),
{ok, QStates, Actions} =
rabbit_queue_type:credit(
QName, Ctag, DeliveryCount, CappedCredit, true, QStates0),
Expand Down Expand Up @@ -1545,7 +1539,7 @@ handle_credit_reply0(
pop_credit_req(
Handle, Ctag,
Link0 = #outgoing_link{
queue_name_bin = QNameBin,
queue_name = QName,
client_flow_ctl = #client_flow_ctl{
delivery_count = CDeliveryCount
} = CFC,
Expand All @@ -1558,13 +1552,11 @@ pop_credit_req(
drain = Drain,
echo = Echo
}},
S0 = #state{cfg = #cfg{vhost = Vhost},
outgoing_links = OutgoingLinks,
S0 = #state{outgoing_links = OutgoingLinks,
queue_states = QStates0}) ->
LinkCreditSnd = amqp10_util:link_credit_snd(
DeliveryCountRcv, LinkCreditRcv, CDeliveryCount),
CappedCredit = cap_credit(LinkCreditSnd),
QName = queue_resource(Vhost, QNameBin),
{ok, QStates, Actions} =
rabbit_queue_type:credit(
QName, Ctag, QDeliveryCount, CappedCredit, Drain, QStates0),
Expand Down Expand Up @@ -2673,7 +2665,7 @@ handle_outgoing_mgmt_link_flow_control(
State#state{outgoing_management_links = Links}.

handle_outgoing_link_flow_control(
#outgoing_link{queue_name_bin = QNameBin,
#outgoing_link{queue_name = QName,
credit_api_version = CreditApiVsn,
client_flow_ctl = CFC,
queue_flow_ctl = QFC,
Expand All @@ -2685,10 +2677,8 @@ handle_outgoing_link_flow_control(
drain = Drain0,
echo = Echo0},
#state{outgoing_links = OutgoingLinks,
queue_states = QStates0,
cfg = #cfg{vhost = Vhost}
queue_states = QStates0
} = State0) ->
QName = queue_resource(Vhost, QNameBin),
Ctag = handle_to_ctag(HandleInt),
DeliveryCountRcv = delivery_count_rcv(MaybeDeliveryCountRcv),
Drain = default(Drain0, false),
Expand Down

0 comments on commit cf3c8ba

Please sign in to comment.