Skip to content

Conversation

@gomoripeti
Copy link
Contributor

Proposed Changes

This avoids message queue build up of the rabbit_writer process in case
the destination is slow (or blocked by a resource alarm) and the sending
to the socket blocks.

This is applied only for one of the ack-modes:

  • on-confirm: already has acknowledgment for each message
  • on-publish: now has credit based flow control - which allows multiple
    but finite number of on-the-fly messages at any time
  • no-ack: highest performance and asynchronicity without any guarantees

fixes #3407

I had a hard time to write a test case for this or come up with properties that stand. The test case in the commit is maybe a bit too introspective into implementation details. But how credit flow behaves between more than two processes and when it exactly blocks is very timing dependent.

Types of Changes

What types of changes does your code introduce to this project?
Put an x in the boxes that apply

  • Bug fix (non-breaking change which fixes issue #NNNN)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause an observable behavior change in existing systems)
  • Documentation improvements (corrections, new content, etc)
  • Cosmetic change (whitespace, formatting, etc)
  • Build system and/or CI

Checklist

Put an x in the boxes that apply.
You can also fill these out after creating the PR.
If you're unsure about any of them, don't hesitate to ask on the mailing list.
We're here to help!
This is simply a reminder of what we are going to look for before merging your code.

  • I have read the CONTRIBUTING.md document
  • I have signed the CA (see https://cla.pivotal.io/sign/rabbitmq)
  • I have added tests that prove my fix is effective or that my feature works
  • All tests pass locally with my changes
  • If relevant, I have added necessary documentation to https://github.com/rabbitmq/rabbitmq-website
  • If relevant, I have added this change to the first version(s) in release-notes that I expect to introduce it

Further Comments

If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution you did and what alternatives you considered, etc.

This avoids message queue build up of the rabbit_writer process in case
the destination is slow (or blocked by a resource alarm) and the sending
to the socket blocks.

This is applied only for one of the ack-modes:
- on-confirm: already has acknowledgment for each message
- on-publish: now has credit based flow control - which allows multiple
but finite number of on-the-fly messages at any time
- no-ack: highest performance and asynchronicity without any guarantees

fixes rabbitmq#3407
Copy link
Collaborator

@michaelklishin michaelklishin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have observed Shovels getting stuck, two runs out of two.

Here are the steps I used:

  1. Start two nodes on the same host
  2. Add a shovel from src-q on node A to dest-q on node B, with on-publish or on-confirm modes, with QoS prefetch set to 10,000
  3. Ensure that it is functional
  4. Add a PerfTest instance that publishers to src-q but has no consumers (-y 0)
  5. Observe shovel transfer a few million messages then stop
  6. Restart the shovel
  7. Observe it transfer the rest of the messages

There were no alarms in effect on either node during the tests.

@gomoripeti
Copy link
Contributor Author

gomoripeti commented Sep 5, 2022 via email

@gomoripeti
Copy link
Contributor Author

unfortunately I cannot reproduce yet
Did you have the shovel defined on the src or dest broker?
Did you have a direct connection on either side or network connections on both sides?

What I tried is to have the shovel on the src side in on-publish mode with prefetch 10K, either direct or network connection to the src. Instead of PerfTest used a simple erlang client publishing 10 byte payload messages as fast as it can without publisher confirms. Publishing ~20M messages put quite a memory pressure on my laptop, the src-q was struggling to process the publishes (the load test connection was in flow state frequently), but the shovel ran fine, eventually moving all messages to dest-q.

@michaelklishin
Copy link
Collaborator

The Shovel was defined on the source node. If you specify "localhost", that end will use a direct connection IIRC.

@gomoripeti
Copy link
Contributor Author

Unfortunately I still couldn't reproduce a stuck shovel. I tried with various queue types to have different timings and prefetch behaviour but no luck, I could push 10M messages through.

I cannot expect you to spend a lot of time debugging this, but if you eventually find the time and manage to reproduce, would be great to see the state of the shovel process. Something like below (assuming only one shovel)

    [ShovelPid] = [P || P <- processes(), (catch element(1, proc_lib:initial_call(P))) == rabbit_shovel_worker].
    recon:info(ShovelPid).
    %% the pending field can contain up to prefetch number of messages, hence its enough to see just the length of the list
    gen_server2:with_state(ShovelPid, fun(#state{config = Config = #{dest := Dest = #{pending := Pending}}}) -> Config#{dest => Dest#{pending => {length, length(Pending)}}};  (S) -> S end).

@michaelklishin
Copy link
Collaborator

I could reproduce this at about 11M messages:

[ShovelPid] = [P || P <- processes(), (catch element(1, proc_lib:initial_call(P))) == rabbit_shovel_worker].
[<0.8067.0>]


%% recon:info(ShovelPid).
[{meta,[{registered_name,[]},
        {dictionary,[{gen_server_call_timeout,130000},
                     {rand_seed,{#{bits => 58,jump => #Fun<rand.3.34006561>,
                                   next => #Fun<rand.0.34006561>,type => exsss,
                                   uniform => #Fun<rand.1.34006561>,
                                   uniform_n => #Fun<rand.2.34006561>},
                                 [265102220148989584|28035894333660420]}},
                     {'$ancestors',[<0.8066.0>,<0.641.0>,
                                    rabbit_shovel_dyn_worker_sup_sup,rabbit_shovel_sup,
                                    <0.637.0>]},
                     {'$initial_call',{rabbit_shovel_worker,init,1}}]},
        {group_leader,<0.636.0>},
        {status,running}]},
 {signals,[{links,[<0.8070.0>,<0.8088.0>,<0.8097.0>,
                   <0.8080.0>,<0.8066.0>]},
           {monitors,[{process,<0.8097.0>}]},
           {monitored_by,[<0.8079.0>,<0.8097.0>]},
           {trap_exit,true}]},
 {location,[{initial_call,{proc_lib,init_p,5}},
            {current_stacktrace,[{gen,do_call,4,
                                      [{file,"gen.erl"},{line,256}]},
                                 {gen_server,call,3,[{file,"gen_server.erl"},{line,378}]},
                                 {rabbit_amqp091_shovel,publish,4,
                                                        [{file,"rabbit_amqp091_shovel.erl"},{line,329}]},
                                 {rabbit_shovel_worker,handle_info,2,
                                                       [{file,"rabbit_shovel_worker.erl"},{line,104}]},
                                 {gen_server2,handle_msg,2,
                                              [{file,"gen_server2.erl"},{line,1067}]},
                                 {proc_lib,init_p_do_apply,3,
                                           [{file,"proc_lib.erl"},{line,240}]}]}]},
 {memory_used,[{memory,1804328},
               {message_queue_len,1},
               {heap_size,28690},
               {total_heap_size,225363},
               {garbage_collection,[{max_heap_size,#{error_logger => true,kill => true,size => 0}},
                                    {min_bin_vheap_size,46422},
                                    {min_heap_size,233},
                                    {fullsweep_after,65535},
                                    {minor_gcs,51}]}]},
 {work,[{reductions,3064020003}]}]


%% gen_server2:with_state(ShovelPid, fun(#state{config = Config = #{dest := Dest = #{pending := Pending}}}) -> Config#{dest => Dest#{pending => {length, length(Pending)}}};  (S) -> S end).
#state{inbound_conn = undefined,inbound_ch = undefined,
       outbound_conn = undefined,outbound_ch = undefined,
       name = {<<"/">>,<<"sh-1">>},
       type = dynamic,
       config = #{ack_mode => on_confirm,
                  dest =>
                      #{blocked_by => [],
                        current =>
                            {<0.8088.0>,<0.8097.0>,<<"amqp://localhost:5673">>},
                        dest_queue => <<"dest-q">>,
                        fields_fun => #Fun<rabbit_shovel_parameters.8.121811777>,
                        module => rabbit_amqp091_shovel,
                        props_fun => #Fun<rabbit_shovel_parameters.9.121811777>,
                        resource_decl => #Fun<rabbit_shovel_parameters.7.121811777>,
                        unacked =>
                            #{3194257 => 3194257,3194163 => 3194163,3194294 => 3194294,
                              3194272 => 3194272,3194107 => 3194107,3194133 => 3194133,
                              3194277 => 3194277,3194255 => 3194255,3194129 => 3194129,
                              3194321 => 3194321,3194222 => 3194222,3194193 => 3194193,
                              3194103 => 3194103,3194382 => 3194382,3194205 => 3194205,
                              3194218 => 3194218,3194345 => 3194345,3194352 => 3194352,
                              3194121 => 3194121,...},
                        uris => ["amqp://localhost:5673"]},
                  name => <<"sh-1">>,reconnect_delay => 5,
                  shovel_type => dynamic,
                  source =>
                      #{consumer_args => [],
                        current => {<0.8070.0>,<0.8080.0>,<<"amqp://">>},
                        delete_after => never,module => rabbit_amqp091_shovel,
                        prefetch_count => 1000,queue => <<"src-q">>,
                        remaining => unlimited,remaining_unacked => unlimited,
                        resource_decl => #Fun<rabbit_shovel_parameters.11.121811777>,
                        source_exchange_key => <<>>,
                        uris => ["amqp://"]}},
       inbound_uri = undefined,outbound_uri = undefined,
       unacked = undefined,remaining = undefined,
       remaining_unacked = undefined}

@michaelklishin
Copy link
Collaborator

Curiously when I left the node alone for a minute or so, the Shovel suddenly started transferring messages again. It can be a Heisenbug and my debugging steps could have had an effect but I cannot prove this.

@michaelklishin
Copy link
Collaborator

I could reproduce this behavior with about 16M messages in the on-publish confirm mode. Both nodes do not have any alarms but note that the Shovel is blocked by credit flow:

[{meta,[{registered_name,[]},
        {dictionary,[{credit_blocked,[<0.8562.0>]},
                     {'$initial_call',{rabbit_shovel_worker,init,1}},
                     {'$ancestors',[<0.8531.0>,<0.641.0>,
                                    rabbit_shovel_dyn_worker_sup_sup,rabbit_shovel_sup,
                                    <0.637.0>]},
                     {credit_flow_default_credit,{400,200}},
                     {gen_server_call_timeout,130000},
                     {credit_blocked_at,-576456243340600600},
                     {{credit_from,<0.8562.0>},0},
                     {rand_seed,{#{bits => 58,jump => #Fun<rand.3.34006561>,
                                   next => #Fun<rand.0.34006561>,type => exsss,
                                   uniform => #Fun<rand.1.34006561>,
                                   uniform_n => #Fun<rand.2.34006561>},
                                 [243017931421367404|36627234746864114]}}]},
        {group_leader,<0.636.0>},
        {status,waiting}]},
 {signals,[{links,[<0.8535.0>,<0.8553.0>,<0.8562.0>,
                   <0.8544.0>,<0.8531.0>]},
           {monitors,[]},
           {monitored_by,[<0.8543.0>]},
           {trap_exit,true}]},
 {location,[{initial_call,{proc_lib,init_p,5}},
            {current_stacktrace,[{gen_server2,process_next_msg,1,
                                              [{file,"gen_server2.erl"},{line,673}]},
                                 {proc_lib,init_p_do_apply,3,
                                           [{file,"proc_lib.erl"},{line,240}]}]}]},
 {memory_used,[{memory,1115288},
               {message_queue_len,0},
               {heap_size,17731},
               {total_heap_size,139267},
               {garbage_collection,[{max_heap_size,#{error_logger => true,kill => true,size => 0}},
                                    {min_bin_vheap_size,46422},
                                    {min_heap_size,233},
                                    {fullsweep_after,65535},
                                    {minor_gcs,2}]}]},
 {work,[{reductions,2964425726}]}]


%% gen_server2:with_state(ShovelPid, fun(#state{config = Config = #{dest := Dest = #{pending := Pending}}}) -> Config#{dest => Dest#{pending => {length, length(Pending)}}};  (S) -> S end).
#{ack_mode => on_publish,
  dest =>
      #{blocked_by => [flow],
        current =>
            {<0.8553.0>,<0.8562.0>,<<"amqp://localhost:5673">>},
        dest_queue => <<"dest-q">>,
        fields_fun => #Fun<rabbit_shovel_parameters.8.121811777>,
        module => rabbit_amqp091_shovel,
        pending => {length,1000},
        props_fun => #Fun<rabbit_shovel_parameters.9.121811777>,
        resource_decl => #Fun<rabbit_shovel_parameters.7.121811777>,
        unacked => #{},
        uris => ["amqp://localhost:5673"]},
  name => <<"sh-2">>,reconnect_delay => 5,
  shovel_type => dynamic,
  source =>
      #{consumer_args => [],
        current => {<0.8535.0>,<0.8544.0>,<<"amqp://">>},
        delete_after => never,module => rabbit_amqp091_shovel,
        prefetch_count => 1000,queue => <<"src-q">>,
        remaining => unlimited,remaining_unacked => unlimited,
        resource_decl => #Fun<rabbit_shovel_parameters.11.121811777>,
        source_exchange_key => <<>>,
        uris => ["amqp://"]}}

@michaelklishin
Copy link
Collaborator

Again, when I purged (not deleted, which would have restarted the Shovel) the destination queue, the Shovel got unblocked.

I guess I may be reaching a limit of some kind with this kind of backlog, all defaults
and a CQv1 durable queue.

@michaelklishin michaelklishin merged commit 0d7e410 into rabbitmq:main Sep 13, 2022
michaelklishin added a commit that referenced this pull request Sep 13, 2022
Use credit flow for dest side of on-publish shovels (backport #5715)
@gomoripeti
Copy link
Contributor Author

I really appreciate your investigation. Apparently I didn't go far enough, with ~14M messages I also managed to reproduce the behaviour you saw. I suspect it must be something with the memory allocation on the destination node, when the dest-q process reached ~10GB memory, the shovel got blocked for minutes and the whole dest node become unresponsive for 10s seconds.

It's good to see that the flow-control in shovel works as intended during this quite heavy loadtest.

thanks again.

@gomoripeti gomoripeti deleted the shovel_flow_control branch September 14, 2022 14:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants