Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit data production from receivers to avoid backpressure scenarios #29410

Open
wbh1 opened this issue Nov 20, 2023 · 19 comments
Open

Limit data production from receivers to avoid backpressure scenarios #29410

wbh1 opened this issue Nov 20, 2023 · 19 comments

Comments

@wbh1
Copy link
Contributor

wbh1 commented Nov 20, 2023

Component(s)

receiver/filelog, receiver/journald

Is your feature request related to a problem? Please describe.

This feature request is based on the FluentBit feature surrounding buffering described here: https://docs.fluentbit.io/manual/administration/backpressure

In our environment (and I'm sure many others), the destinations we send telemetry data to apply some known rate limits to the rate at which we are able to transmit data. For example, we operate a Loki cluster that applies a per-stream rate limit of XMB/s. However, there is currently no way (that I know of, at least) in otelcol to apply any sort of throttling to the amount of data produced [without losing data like through sampling].

In our specific use case, we're using the filelog receiver. Under heavy load, some systems may exceed the aforementioned rate limit in the amount of data they're producing. However, there is no good way to communicate this back to the data producer. This is because we observe the rate limits on our gateways and there's currently no way to limit the data coming into the gateway, although #6908 probably solves this.

Regardless, that rate limiter will just shift the problem of getting a clogged-up queue from the gateway to the agent. Instead, we'd rather be able to limit the speed at which data is produced to prevent either the agent or the gateway from getting backed up with retries.

Describe the solution you'd like

For receivers where it makes sense (e.g. filelog, journald, basically anything that is reading from an external data source instead of having data pushed to it), there could be a way to specify a maximum ingestion rate. I think this should be configured per-receiver, but I'm sure there's an argument to be made that it should be a processor.

Measuring by size is preferable, but you could also limit by a count of data points (e.g. log lines). When this limit is reached, ingestion is paused on the receiver until the next interval. At the next interval, the receiver will resume ingestion at the same offset where it left off so there is no data loss.

The receiver is expected to be allowed to fall behind, but this is an acceptable tradeoff in order to ensure that -- eventually -- all data will be present. This is in contrast to the current system in which backpressure scenarios can result in loss of data due to exhausting retries.

Describe alternatives you've considered

The alternatives I've considered are outlined above, but all result in different types of backpressure scenarios just shifting where it occurs to different places in a pipeline. To my knowledge, the only way to prevent backpressure is to either scale up your exporters' destination(s) to accept data faster (not always in control of users) or to throttle the production of data.

Additional context

Some receivers like journald don't currently have a system to track their progress in receiving data the way that filelog does with offsets. Instead, the journald receiver constantly receives data by tailing the journal. In such a case, pausing ingestion would still cause memory to balloon. This would then trigger the memory_limiter, if configured, and data would still be dropped. Probably not desirable, but that's a separate issue for implementing that sort of logic in that receiver.

@wbh1 wbh1 added enhancement New feature or request needs triage New item requiring triage labels Nov 20, 2023
Copy link
Contributor

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@djaglowski
Copy link
Member

My understanding is that we expect these receivers to block when downstream components are not ready to consume data. @dmitryax, isn't this what was achieved with #20864?

@wbh1
Copy link
Contributor Author

wbh1 commented Dec 7, 2023

I would view this feature request as supplementary to the linked PR.

It is still useful to explicitly limit the production of data without needing to encounter backpressure and trigger retries. It would help to avoid potential loss of data due to retries being exhausted in a situation where (data_produced_in_interval / upstream_rate_limit) > max_possible_retries, and reduce the load on the receiving system when you already know the limits of it in advance.

Additionally, relying on the exporter to trigger the slowdown is not feasible in all architectures. For example, if you're sending from collector -> otel gateway -> destination and the destination is the one rate-limiting data ingestion, the collector will remain unaware of errors since -- from its perspective -- the otel gateway successfully received the data.

@djaglowski
Copy link
Member

I think you have some good points here, that backpressure should not necessarily come from exporters only.

I'm thinking it would make the most sense as a per-receiver config, along the lines of consumerretry, which can be embedded into any receiver config. I'd be curious to here what @dmitryax thinks of this.

Copy link
Contributor

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@github-actions github-actions bot added the Stale label Feb 12, 2024
@sumo-drosiek
Copy link
Member

For journald I wonder if there is a way to rely on max system PIPE size, so if we don;t read from stdout fast enough, journalctl will wait until we start again.
If not, this change may require reading from journald manually 🤔

@wbh1
Copy link
Contributor Author

wbh1 commented Feb 13, 2024

Yeah... I don't think that we'd be able to keep using the -f flag to journalctl. Instead, I think we'd need to keep track of the __CURSOR value from the last read log line and then resume reading from that cursor position on the next read.

@atoulme
Copy link
Contributor

atoulme commented Mar 30, 2024

Please look at the new exporterhelper batch sender here: https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/batch_sender.go it implements the ability to apply backpressure back to receivers.

@atoulme atoulme removed the needs triage New item requiring triage label Mar 30, 2024
Copy link
Contributor

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

Pinging code owners:

See Adding Labels via Comments if you do not have permissions to add labels yourself.

@github-actions github-actions bot added the Stale label May 30, 2024
Copy link
Contributor

This issue has been closed as inactive because it has been stale for 120 days with no activity.

@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Jul 29, 2024
@sumo-drosiek
Copy link
Member

Can we reopen this? It's still valid

@matthewmodestino
Copy link

matthewmodestino commented Aug 6, 2024

Definitely still valid and will become a HUGE detractor for OTel when more and more log focused people catch wind. I have raised this multiple times as a major issue in the fundamental logic of handling enterprise logs.

This doesnt even need to be as dynamic as a rate limit (tho that sounds awesome).

We have been pausing the input flow for like 20 years at Splunk, and all we do is pause reading and wait for the downstream to show its accepting data again. Pretty sure collectors like beats and the mentioned fluent* do the same.

Way easier for a user to add disk and adjust log rotation ( dont get me started on k8s ridiculous file rotation defaults) logic to extend the time they can experience congestion or outage.

Picking up logs when you have nowhere to send them balloons memory usage and is just not wise when the logs are already on durable disk.

This is sorely needed or its gunna be some tough times for folks adopting OTel in high stakes enviros for logs. Spoiler alert...already has for some massive regulated customers.

@djaglowski
Copy link
Member

Thanks for emphasizing this issue @matthewmodestino.

Picking up logs when you have nowhere to send them balloons memory usage and is just not wise when the logs are already on durable disk.

I think this is the key point but it's not clear to me how it should work. Back pressure in the data pipeline is a partial and imperfect solution, because as @wbh1 described there may be negative consequences downstream, or simply because downstream does not apply back pressure (e.g. asynchronous components).

We have been pausing the input flow for like 20 years at Splunk, and all we do is pause reading and wait for the downstream to show its accepting data again.

Am I understanding correctly that this is similar to back pressure, except once failures begin the receiver pauses entirely and waits for a direct communication that it should resume? I can see how that would work well, but this would mean direct communication between components, which may require some new architecture. Do you have any thoughts on how we should accomplish this?

I wonder if this needs to be a global framework for the collector, complimentary to the backpressure mechanism described in #6908 (comment). Maybe it could work something like this:

  • Receivers may describe themselves as "pausable" via a new receiver factory option.
  • Exporters, during the normal course of operations, may at any point report a "backoff event". When reporting this, they are given a callback to report a return to normal operation.
  • When the collector receives a backoff event from an exporter, it notifies all pausable receivers.
  • When an exporter calls a backoff callback, the collector checks if there are no other exporters in a backoff state, and if not then it notifies all pausable receivers to resume.

There would be some details to work out here but I'm curious if others think this may be worth exploring further.

cc: @open-telemetry/collector-approvers @open-telemetry/collector-contrib-approvers


Some more detailed thoughts on this design:

  • We could in theory use the component graph to determine exactly which receivers are actually upstream of an exporter and pause only those receivers.
  • "Pausing" needs to be defined better.
    • It could just mean Shutdown and later Start are called on the receiver, but for simplicity I believe we currently state that component instances may only be started once.
    • A better way is probably that pausable recievers must implement Pause and Resume methods. (And if so, maybe they don't even need to register since we could just check for an interface.)

@matthewmodestino
Copy link

matthewmodestino commented Aug 9, 2024

I think we have found a way forward working with my colleague @atoulme at Splunk and using a bunch of work from over the last 9 months originating here

We basically have set both the filelog reciver, and the exporter to retry on failure, and set the timer to 0 to retry forever, and remove the batch processor, which has been moved into the exporter, allowing the filelog reciever to "pause" when it recieves errors from downstream (exporter).

Won't hijack this issue further, as the original ask, while similar is likely more about a "throttle", Which would also be cool (similar to Splunk MaxKBps limit setting) ...but glad to see we are getting closer to a more enterprise grade answer, and at least OP may have a config option to at least try not to drop data if receiving throttling messages etc...

Still a ton of testing to do, but glad we seeing it get a bit closer.

@juergen-kaiser-by
Copy link

We have a similar feature request/use case I would liket o add here. If wished, I can create an own issue for that.

We run an observability backend (Elasticsearch) shared by many teams and services (1000s). The services run in kubernetes clusters and we want to collect the logs of all pods. Problem: If a service/pod becomes very noisy for some reason, it can burden the backend so much that all other teams feel it. In short: One team can ruin the day for all others.

We would like to limit the effect a single instance or service can have on the observability backend. A solution could be implemented in different ways:

  • A receiver or processor could limit the flow based on some atributes similar to the rate limiter of filebeat. We apply a standardized set of labels to each Kubernetes deployment, so we would work with those.
  • Dropping log lines would be okay (for us) since we can not assume that a noisy service stops being noisy soon.
  • If rate limiting happens then we and our users should be able to see it. A single metric ingested into some (other) pipeline would suffice. Let us choose a set of attributes the metric should have (copied) from the rate limited logs so that we can map it to the affected service. In our case, we would like to have our standard pod labels copied to the metric.

@djaglowski
Copy link
Member

@juergen-kaiser-by, I think a new issue makes sense to evaluate the various options. If there's consensus this should be solved in the receiver, we can take a closer look on this issue.

@wbh1
Copy link
Contributor Author

wbh1 commented Sep 15, 2024

@juergen-kaiser-by - I think your use case may be covered by the open issue at #6908

@juergen-kaiser-by
Copy link

@juergen-kaiser-by - I think your use case may be covered by the open issue at #6908

Depends on the implementation details and how the collector is deployed to srape the logs (https://opentelemetry.io/docs/kubernetes/collector/components/#filelog-receiver). We need rate limiting based on pod attributes. We could rate limit based on pod uid (which can be parsed in the filelog receiver from the file name) but I do not see how we can properly report happening throttling if the collector is deployed in the recommended mode (Daemonset) because we need other pod information (labels) to map the throtteling back to a deployment and/or service.

If we deploy the collector as a sidecar, we could inject that data into the collector config via env vars but that is an "advanced configuration" and potentially wastes recources as we run thousands of pods in our clusters.

@juergen-kaiser-by
Copy link

Will create a new ticket for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants