Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pulsar-broker] Broker handle back-pressure with max-pending message across topics to avoid OOM #7499

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

rdhabalia
Copy link
Contributor

Motivation

We have seen multiple different scenarios when broker suddenly sees huge spike in heap-memory usage and consumes all allocated heap-memory and eventually it crashes with OOM. One of the scenarios for broker crashing with OOM is broker can't handle the back-pressure from bookie add-entry timeout.
Broker limits max-pending messages per topic but it doesn't limit total number of pending messages across all topics. if broker is serving many topics with high publish rate and due to some reasons if broker started seeing add-entry timeout from bk-client then it allocates large number of non-recyclable objects which starts causing high GC and eventually it crashes with OOM. We saw many brokers crashed same time due to bk n/w partitioning/bk add-entry high add-latency. It can be easily reproducible by simulating bookie behavior which can causeBookie operation timeout error at broker , and publish with 30K-40K rate with 1K topics.
Therefore, we need a mechanism to handle bookie back-pressure at broker by limiting number of pending messages across all topics in the broker.

Broker-Error: Add-entry timing out at bk-client

org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [prop/cluster/ns/persistent/t1] Created new ledger 123456
13:25:04.468 [BookKeeperClientWorker-OrderedExecutor-20-0] WARN  org.apache.bookkeeper.client.PendingAddOp - Failed to write entry (123456, 1): Bookie operation timeout
13:25:04.469 [BookKeeperClientWorker-OrderedExecutor-20-0] WARN  org.apache.bookkeeper.client.PendingAddOp - Failed to write entry (123456, 2): Bookie operation timeout
13:25:04.469 [BookKeeperClientWorker-OrderedExecutor-20-0] WARN  org.apache.bookkeeper.client.PendingAddOp - Failed to write entry (123456, 3): Bookie operation timeout
13:25:04.469 [BookKeeperClientWorker-OrderedExecutor-20-0] WARN  org.apache.bookkeeper.client.PendingAddOp - Failed to write entry (123456, 4): Bookie operation timeout

Broker sees sudden spike in heap memory usage and crashes
Snip20200709_56

Modification

  • add configuration to restrict total pending publish messages across all topics in a broker: maxConcurrentPendingPublishMessages
  • by default this feature will be disable with value =0 and will not change any existing behavior

@merlimat
Copy link
Contributor

@rdhabalia there is already a mechanism in place. ptal at #7406

@rdhabalia
Copy link
Contributor Author

#7406 manages broker-wide throttling at the IO thread level and we can still reproduce above issue with the #7406 fix.
This PR actually manages this back-pressure at the broker level rather at the thread level and keeps broker stable in all scenarios. This PR also doesn't use a mutex and shouldn't create a contention in write path.
So, I think we will need this PR for OOM issues we are facing frequently.

@merlimat
Copy link
Contributor

@rdhabalia The change in #7406 is to avoid any contention in write path and to impose a global max.

@rdhabalia
Copy link
Contributor Author

The change in #7406 is to avoid any contention in write path and to impose a global max.

I have gone through with #7406 PR but as I said it distributes max global bytes across separate IO threads which has its own disadvantage eg: 1gb across 30 IO threads means 1 IO thread can only do 33MB it means as a user I have to create multiple connections and hope it will be served by other IO thread to get higher throughput.
It doesn't happen in this PR as all connections see 1 count without much contention. Please let me know if you see any contention or issue with this PR. because I think we would prefer to use global count across all cnx and handle the backpressure.

@rdhabalia
Copy link
Contributor Author

/pulsarbot run-failure-checks

@merlimat
Copy link
Contributor

g: 1gb across 30 IO threads means 1 IO thread can only do 33MB it means as a user I have to create multiple connections and hope it will be served by other IO thread to get higher throughput.

A couple of notes:

  • You won't get higher throughput after a certain amount of memory (eg: 30MB)
  • If you have 16 cores available, you would also have >> 1GB of direct mem

Finally, the number of messages/s is not a good proxy measure for the amount of memory required in the broker

@rdhabalia
Copy link
Contributor Author

Finally, the number of messages/s is not a good proxy measure for the amount of memory required in the broker

Yes, I will add restriction based on number bytes as well along with number of messages. However, my argument about #7406 is I don't want to restrict throughput per thread and that requires user to create multiple connections to get higher throughput and that will definitely not work for us in the long term.
So, can you please let me know if you see any major concern about the PR if not then I can add restriction based on bytes.

@codelipenghui
Copy link
Contributor

Yes, I will add restriction based on number bytes as well along with number of messages.

Is it the same as #6178 does?

@rdhabalia
Copy link
Contributor Author

Is it the same as #6178 does?

Yes, but #6178 uses a volatile counter and that creates contention in write path and therefore, @merlimat tried to remove it in #7406 by using thread-local. but only downside for #7406 is it puts restriction for each IO thread that can makes user to create multiple connections to achieve higher throughput. and I believe this PR addresses both the issues and we can use the same buffer config introduced in #6178.

@merlimat
Copy link
Contributor

don't want to restrict throughput per thread and that requires user to create multiple connections t

@rdhabalia It does not impact the throughput once you have a minimum of memory per thread.

@rdhabalia
Copy link
Contributor Author

@merlimat we need a fix for the OOM soon. I don't see any issue with this PR but if we still want to move forward with #7406 then can we please confirm and we can have that as a fix. However, we should update the disadvantage of this PR to avoid similar PR in future with the same fix (because based on past experience such things always happen).

@jiazhai
Copy link
Member

jiazhai commented Jul 14, 2020

pinged @codelipenghui for 7406

@rdhabalia
Copy link
Contributor Author

We are frequently seeing this issue again and again in our env and it impacts multiple brokers at the same time and they crash. Can we review the PR or let us know if anyone has any concern about the PR?

@rdhabalia
Copy link
Contributor Author

I will try one more time to comment on this PR. We have been facing this issue in production for a long time. Should this PR come from a specific community to be approved and merged?

Copy link
Contributor

@eolivelli eolivelli left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@merlimat
Copy link
Contributor

@rdhabalia Have tried with #7406 which is already in 2.8? I still think we shouldn't add contended counters. We have validated that previous OOMs are not happening anymore in 2.8.

The changes in #7406 can still be improved to use token bucket to distribute capacity across the threads, in which case the contention will be amortized away. Although, in practice, I haven't seen the skew across threads being a significant problem.

@rdhabalia
Copy link
Contributor Author

We have added this PR #7499 patch in internally and was working fine but after upgrading pulsar version and without this change our broker again started crashing.

The main issue with #7406 which I mentioned in previous comment as well that it distributes max global bytes across separate IO threads which has its own disadvantage eg: 1gb across 30 IO threads means 1 IO thread can only do 33MB it means as a user I have to create multiple connections and hope it will be served by other IO thread to get higher throughput. In multi-tenant environment , this issue will be really painful when a critical user starts facing high latency due per thread bottleneck even though broker has resources but distributed to other threads and at that time we have to manually unload topics at server side to spread new connections across different threads and wait to be lucky for uniform cnx distribution. #7406 is doesn't make our life easy when we are managing pulsar in multi-tenant environment.

@merlimat
Copy link
Contributor

I don't think that applies well to multi tenant env. In a multi-tenant broker, the load is usually well spread across all IO threads. This could more of an issue on a single-tenant, single-use-case cluster with very large throughput and tiny number of partitions.

Also, I'm not yet convinced that it would be pose any reduction in throughput in the above scenario in particular. eg: 30MB per thread will give a decent amount of traffic. (and to my previous point, a broker with 30cores should probably be assigned a lot more than 1 GB of direct memory, more likely at least 32GB, which would leave this at 1 GB buffer per core).

Even then, I still believe we should go with thread-local approach, by dynamically adjusting per-thread quotas.

@rdhabalia
Copy link
Contributor Author

by dynamically adjusting per-thread quotas.

how do we train broker to adjust per-thread quotas dynamically?

@merlimat
Copy link
Contributor

how do we train broker to adjust per-thread quotas dynamically?

Most of rate limiting implementations are based on variations of Token-Bucket: https://en.wikipedia.org/wiki/Token_bucket

Eg: even in the per-namespace rate limiting we're doing it in distributed fashion.

In this case, the point is that each thread starts with a quota and if it's not used it will be up for grab by different threads. The main difference is that we don't have to pay for contention each time we try to send 1 message, rather the quotas are adjusted at discrete intervals.

@rdhabalia
Copy link
Contributor Author

@merlimat

I still think we shouldn't add contended counters. We have validated that previous OOMs are not happening anymore in 2.8. Even then, I still believe we should go with thread-local approach, by dynamically adjusting per-thread quotas.

This PR uses LongAdder counter instead contented atomic counter which should avoid contention in the write path. The benefit of this approach is predictable throttling at broker level rather at the thread level. my worry is #7406 definitely has issue of uniform resource distribution and dynamic resource adjustment eventually follow the same model but in a little complex way??

@merlimat
Copy link
Contributor

merlimat commented Oct 1, 2021

@merlimat

I still think we shouldn't add contended counters. We have validated that previous OOMs are not happening anymore in 2.8. Even then, I still believe we should go with thread-local approach, by dynamically adjusting per-thread quotas.

This PR uses LongAdder counter instead contented atomic counter which should avoid contention in the write path. The benefit of this approach is predictable throttling at broker level rather at the thread level. my worry is #7406 definitely has issue of uniform resource distribution and dynamic resource adjustment eventually follow the same model but in a little complex way??

@rdhabalia the problem in this implementation is that does not guarantee that the max limit (in term of messages or bus) is respected, since the check is done in background every few seconds (given that LongAdder.sum() is expensive). It's very possible that a traffic spike will to cause an OOM between the checks.

@rdhabalia
Copy link
Contributor Author

since the check is done in background every few seconds (given that LongAdder.sum() is expensive). It's very possible that a traffic spike will to cause an OOM between the checks.

It actually performs at every second and ticker time can be configurable to make sure at any point broker doesn't exceed the limit. infact 1 sec is also a reasonable number most of the time because the load will not spike to take 100% memory in 1 second but in any case it can be configurable. will take make sense?

(given that LongAdder.sum() is expensive)

yes, it happens in a separate a thread similar to stats and other throttling usecase to use LongAdder for the aggregation. So, it won't impact write path latency.

@merlimat
Copy link
Contributor

merlimat commented Oct 1, 2021

since the check is done in background every few seconds (given that LongAdder.sum() is expensive). It's very possible that a traffic spike will to cause an OOM between the checks.

It actually performs at every second and ticker time can be configurable to make sure at any point broker doesn't exceed the limit. infact 1 sec is also a reasonable number most of the time because the load will not spike to take 100% memory in 1 second but in any case it can be configurable. will take make sense?

In the PR, he check for when to pause is done every 5sec and the check for when to resume is done at 1sec.

But, irrespective of that, my point is that there's no way to guarantee an upper bound on the memory being used. With this approach, it will always go over the quota. Even a 1sec interval at 500MB/s can mean a lot memory above the quota.

@github-actions
Copy link

github-actions bot commented Mar 4, 2022

@rdhabalia:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants