-
Notifications
You must be signed in to change notification settings - Fork 4k
Use credit flow for dest side of on-publish shovels #5715
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This avoids message queue build up of the rabbit_writer process in case the destination is slow (or blocked by a resource alarm) and the sending to the socket blocks. This is applied only for one of the ack-modes: - on-confirm: already has acknowledgment for each message - on-publish: now has credit based flow control - which allows multiple but finite number of on-the-fly messages at any time - no-ack: highest performance and asynchronicity without any guarantees fixes rabbitmq#3407
michaelklishin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have observed Shovels getting stuck, two runs out of two.
Here are the steps I used:
- Start two nodes on the same host
- Add a shovel from
src-qon node A todest-qon node B, withon-publishoron-confirmmodes, with QoS prefetch set to 10,000 - Ensure that it is functional
- Add a PerfTest instance that publishers to
src-qbut has no consumers (-y 0) - Observe shovel transfer a few million messages then stop
- Restart the shovel
- Observe it transfer the rest of the messages
There were no alarms in effect on either node during the tests.
|
Thank you Michal, I will investigate
|
|
unfortunately I cannot reproduce yet What I tried is to have the shovel on the src side in on-publish mode with prefetch 10K, either direct or network connection to the src. Instead of PerfTest used a simple erlang client publishing 10 byte payload messages as fast as it can without publisher confirms. Publishing ~20M messages put quite a memory pressure on my laptop, the src-q was struggling to process the publishes (the load test connection was in flow state frequently), but the shovel ran fine, eventually moving all messages to dest-q. |
|
The Shovel was defined on the source node. If you specify "localhost", that end will use a direct connection IIRC. |
|
Unfortunately I still couldn't reproduce a stuck shovel. I tried with various queue types to have different timings and prefetch behaviour but no luck, I could push 10M messages through. I cannot expect you to spend a lot of time debugging this, but if you eventually find the time and manage to reproduce, would be great to see the state of the shovel process. Something like below (assuming only one shovel) |
|
I could reproduce this at about 11M messages: [ShovelPid] = [P || P <- processes(), (catch element(1, proc_lib:initial_call(P))) == rabbit_shovel_worker].
[<0.8067.0>]
%% recon:info(ShovelPid).
[{meta,[{registered_name,[]},
{dictionary,[{gen_server_call_timeout,130000},
{rand_seed,{#{bits => 58,jump => #Fun<rand.3.34006561>,
next => #Fun<rand.0.34006561>,type => exsss,
uniform => #Fun<rand.1.34006561>,
uniform_n => #Fun<rand.2.34006561>},
[265102220148989584|28035894333660420]}},
{'$ancestors',[<0.8066.0>,<0.641.0>,
rabbit_shovel_dyn_worker_sup_sup,rabbit_shovel_sup,
<0.637.0>]},
{'$initial_call',{rabbit_shovel_worker,init,1}}]},
{group_leader,<0.636.0>},
{status,running}]},
{signals,[{links,[<0.8070.0>,<0.8088.0>,<0.8097.0>,
<0.8080.0>,<0.8066.0>]},
{monitors,[{process,<0.8097.0>}]},
{monitored_by,[<0.8079.0>,<0.8097.0>]},
{trap_exit,true}]},
{location,[{initial_call,{proc_lib,init_p,5}},
{current_stacktrace,[{gen,do_call,4,
[{file,"gen.erl"},{line,256}]},
{gen_server,call,3,[{file,"gen_server.erl"},{line,378}]},
{rabbit_amqp091_shovel,publish,4,
[{file,"rabbit_amqp091_shovel.erl"},{line,329}]},
{rabbit_shovel_worker,handle_info,2,
[{file,"rabbit_shovel_worker.erl"},{line,104}]},
{gen_server2,handle_msg,2,
[{file,"gen_server2.erl"},{line,1067}]},
{proc_lib,init_p_do_apply,3,
[{file,"proc_lib.erl"},{line,240}]}]}]},
{memory_used,[{memory,1804328},
{message_queue_len,1},
{heap_size,28690},
{total_heap_size,225363},
{garbage_collection,[{max_heap_size,#{error_logger => true,kill => true,size => 0}},
{min_bin_vheap_size,46422},
{min_heap_size,233},
{fullsweep_after,65535},
{minor_gcs,51}]}]},
{work,[{reductions,3064020003}]}]
%% gen_server2:with_state(ShovelPid, fun(#state{config = Config = #{dest := Dest = #{pending := Pending}}}) -> Config#{dest => Dest#{pending => {length, length(Pending)}}}; (S) -> S end).
#state{inbound_conn = undefined,inbound_ch = undefined,
outbound_conn = undefined,outbound_ch = undefined,
name = {<<"/">>,<<"sh-1">>},
type = dynamic,
config = #{ack_mode => on_confirm,
dest =>
#{blocked_by => [],
current =>
{<0.8088.0>,<0.8097.0>,<<"amqp://localhost:5673">>},
dest_queue => <<"dest-q">>,
fields_fun => #Fun<rabbit_shovel_parameters.8.121811777>,
module => rabbit_amqp091_shovel,
props_fun => #Fun<rabbit_shovel_parameters.9.121811777>,
resource_decl => #Fun<rabbit_shovel_parameters.7.121811777>,
unacked =>
#{3194257 => 3194257,3194163 => 3194163,3194294 => 3194294,
3194272 => 3194272,3194107 => 3194107,3194133 => 3194133,
3194277 => 3194277,3194255 => 3194255,3194129 => 3194129,
3194321 => 3194321,3194222 => 3194222,3194193 => 3194193,
3194103 => 3194103,3194382 => 3194382,3194205 => 3194205,
3194218 => 3194218,3194345 => 3194345,3194352 => 3194352,
3194121 => 3194121,...},
uris => ["amqp://localhost:5673"]},
name => <<"sh-1">>,reconnect_delay => 5,
shovel_type => dynamic,
source =>
#{consumer_args => [],
current => {<0.8070.0>,<0.8080.0>,<<"amqp://">>},
delete_after => never,module => rabbit_amqp091_shovel,
prefetch_count => 1000,queue => <<"src-q">>,
remaining => unlimited,remaining_unacked => unlimited,
resource_decl => #Fun<rabbit_shovel_parameters.11.121811777>,
source_exchange_key => <<>>,
uris => ["amqp://"]}},
inbound_uri = undefined,outbound_uri = undefined,
unacked = undefined,remaining = undefined,
remaining_unacked = undefined} |
|
Curiously when I left the node alone for a minute or so, the Shovel suddenly started transferring messages again. It can be a Heisenbug and my debugging steps could have had an effect but I cannot prove this. |
|
I could reproduce this behavior with about 16M messages in the |
|
Again, when I purged (not deleted, which would have restarted the Shovel) the destination queue, the Shovel got unblocked. I guess I may be reaching a limit of some kind with this kind of backlog, all defaults |
Use credit flow for dest side of on-publish shovels (backport #5715)
|
I really appreciate your investigation. Apparently I didn't go far enough, with ~14M messages I also managed to reproduce the behaviour you saw. I suspect it must be something with the memory allocation on the destination node, when the dest-q process reached ~10GB memory, the shovel got blocked for minutes and the whole dest node become unresponsive for 10s seconds. It's good to see that the flow-control in shovel works as intended during this quite heavy loadtest. thanks again. |
Proposed Changes
This avoids message queue build up of the rabbit_writer process in case
the destination is slow (or blocked by a resource alarm) and the sending
to the socket blocks.
This is applied only for one of the ack-modes:
but finite number of on-the-fly messages at any time
fixes #3407
I had a hard time to write a test case for this or come up with properties that stand. The test case in the commit is maybe a bit too introspective into implementation details. But how credit flow behaves between more than two processes and when it exactly blocks is very timing dependent.
Types of Changes
What types of changes does your code introduce to this project?
Put an
xin the boxes that applyChecklist
Put an
xin the boxes that apply.You can also fill these out after creating the PR.
If you're unsure about any of them, don't hesitate to ask on the mailing list.
We're here to help!
This is simply a reminder of what we are going to look for before merging your code.
CONTRIBUTING.mddocumentFurther Comments
If this is a relatively large or complex change, kick off the discussion by explaining why you chose the solution you did and what alternatives you considered, etc.